This notebook illustrates:
This notebook was modified based on the work originally published by VALLIAPPA LAKSHMANAN.
Send any feedback to datalab-feedback@google.com.
In [1]:
import numpy as np
import seaborn as sns
import pandas as pd
SEQ_LEN = 40
def create_time_series():
freq = (np.random.random() * 0.5) + 0.1 # 0.1 to 0.6
ampl = np.random.random() + 0.5 # 0.5 to 1.5
x = np.sin(np.arange(0, SEQ_LEN) * freq) * ampl
return x
for i in xrange(0, 5):
sns.tsplot( create_time_series() ); # 5 series
In [2]:
# Save the data to disk.
def to_csv(filename, N):
with open(filename, 'w') as ofp:
for lineno in xrange(0, N):
seq = create_time_series()
line = ",".join(map(str, seq))
ofp.write(line + '\n')
to_csv('train.csv', 1003)
to_csv('eval.csv', 108)
In [3]:
!head -1 eval.csv
Our CSV file sequences consist of 40 numbers. Each number is one input and the prediction output is the next number given previous numbers as history. With 40 numbers (one instance) input, we will have 40 output numbers. For training, each instance's 0~38 numbers are inputs, and 1~39 are truth. For prediction, it is like "given a series of numbers, predict next n numbers".
We will create a recurrent neural network model based on TensorFlow.
For more info on RNN, see:
We will use TensorFlow's Estimator to build our model. Estimators help construct the training/evaluation/prediction graph. They reuse the common graph, and fork only when needed (i.e. input_fn). They also handle model export. Models exported can be deployed to Google Cloud ML Engine for online prediction.
In [4]:
import tensorflow as tf
import shutil
import tensorflow.contrib.learn as tflearn
import tensorflow.contrib.layers as tflayers
from tensorflow.contrib.learn.python.learn import learn_runner
from tensorflow.contrib.learn.python.learn.utils import saved_model_export_utils
import tensorflow.contrib.rnn as rnn
# tf.decode_csv requires DEFAULTS to infer data types and default values.
DEFAULTS = [[0.0] for x in xrange(0, SEQ_LEN)]
# The Estimator API requires named features.
TIMESERIES_FEATURE_NAME = 'rawdata'
# Training batch size.
BATCH_SIZE = 25
In [5]:
def create_input_fn(filename, mode=tf.contrib.learn.ModeKeys.TRAIN):
"""Creates an input_fn for estimator in training or evaluation."""
def _input_fn():
"""Returns named features and labels, as required by Estimator."""
# could be a path to one file or a file pattern.
input_file_names = tf.train.match_filenames_once(filename)
filename_queue = tf.train.string_input_producer(
input_file_names, num_epochs=None, shuffle=True)
reader = tf.TextLineReader()
_, value = reader.read_up_to(filename_queue, num_records=BATCH_SIZE)
# parse the csv values
batch_data = tf.decode_csv(value, record_defaults=DEFAULTS)
batch_data = tf.transpose(batch_data) # [BATCH_SIZE, SEQ_LEN]
# Get x and y. They are both of shape [BATCH_SIZE, SEQ_LEN - 1]
batch_len = tf.shape(batch_data)[0]
x = tf.slice(batch_data, [0, 0], [batch_len, SEQ_LEN-1])
y = tf.slice(batch_data, [0, 1], [batch_len, SEQ_LEN-1])
return {TIMESERIES_FEATURE_NAME: x}, y # dict of features, target
return _input_fn
Following Estimator's requirements, we will create a model_fn representing the inference model. Note that this function defines the graph that will be used in training, evaluation and prediction.
To supply a model function to the Estimator API, you need to return a ModelFnOps. The rest of the function creates the necessary objects.
In [6]:
# We will define one LSTM layer. That's the size of LSTM units.
LSTM_SIZE = 10
def model_fn(features, targets, mode):
"""Define the inference model."""
uniform_initializer = tf.random_uniform_initializer(minval=-0.08, maxval=0.08)
input_seq = features[TIMESERIES_FEATURE_NAME]
# RNN requires input tensor rank > 2. Adding one dimension.
input_seq = tf.expand_dims(input_seq, axis=-1)
# LSTM output will be [BATCH_SIZE, SEQ_LEN - 1, lstm_output_size]
lstm_cell = rnn.BasicLSTMCell(LSTM_SIZE)
lstm_outputs, _ = tf.nn.dynamic_rnn(cell=lstm_cell,
inputs=input_seq,
dtype=tf.float32)
# Reshape to [BATCH_SIZE * (SEQ_LEN - 1), lstm_output] so it is 2-D and can
# be fed to next layer.
lstm_outputs = tf.reshape(lstm_outputs, [-1, lstm_cell.output_size])
# Add hidden layers on top of LSTM layer to add some "nonlinear" to the model.
hidden1 = tf.contrib.layers.fully_connected(inputs=lstm_outputs,
num_outputs=100,
activation_fn=None,
weights_initializer=uniform_initializer,
biases_initializer=uniform_initializer)
hidden2 = tf.contrib.layers.fully_connected(inputs=lstm_outputs,
num_outputs=50,
activation_fn=None,
weights_initializer=uniform_initializer,
biases_initializer=uniform_initializer)
predictions = tf.contrib.layers.fully_connected(inputs=hidden2,
num_outputs=1,
activation_fn=None,
weights_initializer=uniform_initializer,
biases_initializer=uniform_initializer)
# predictions are all we need when mode is not train/eval.
predictions_dict = {"predicted": predictions}
# If train/evaluation, we'll need to compute loss.
# If train, we will also need to create an optimizer.
loss, train_op, eval_metric_ops = None, None, None
if mode == tf.contrib.learn.ModeKeys.TRAIN or mode == tf.contrib.learn.ModeKeys.EVAL:
# Note: The reshape below is needed because Estimator needs to know
# loss shape. Without reshaping below, loss's shape would be unknown.
targets = tf.reshape(targets, [tf.size(targets)])
predictions = tf.reshape(predictions, [tf.size(predictions)])
loss = tf.losses.mean_squared_error(targets, predictions)
eval_metric_ops = {
"rmse": tf.metrics.root_mean_squared_error(targets, predictions)
}
if mode == tf.contrib.learn.ModeKeys.TRAIN:
# The learning rate here is unusually high, because we don't add any noise
# to training/evaluation data and overfitting is not a big problem.
train_op = tf.contrib.layers.optimize_loss(
loss=loss,
global_step=tf.contrib.framework.get_global_step(),
learning_rate=0.1,
optimizer="Adagrad")
# return ModelFnOps as Estimator requires.
return tflearn.ModelFnOps(
mode=mode,
predictions=predictions_dict,
loss=loss,
train_op=train_op,
eval_metric_ops=eval_metric_ops)
Distributed training is launched off using an Experiment. The key line here is that we use Estimator rather than, say DNNRegressor. This allows us to provide a model_fn, which will be our RNN defined above. Note also that we specify a serving_input_fn -- this is how we parse the input data provided to us at prediction time using gcloud or Cloud ML Online Prediction.
In [7]:
def get_train():
return create_input_fn('train.csv', mode=tf.contrib.learn.ModeKeys.TRAIN)
def get_eval():
return create_input_fn('eval.csv', mode=tf.contrib.learn.ModeKeys.EVAL)
def serving_input_fn():
feature_placeholders = {
TIMESERIES_FEATURE_NAME: tf.placeholder(tf.float32, [None, None])
}
return tflearn.utils.input_fn_utils.InputFnOps(
feature_placeholders,
None,
feature_placeholders
)
def experiment_fn(output_dir):
"""An experiment_fn required for Estimator API to run training."""
estimator = tflearn.Estimator(model_fn=model_fn,
model_dir=output_dir,
config=tf.contrib.learn.RunConfig(save_checkpoints_steps=500))
return tflearn.Experiment(
estimator,
train_input_fn=get_train(),
eval_input_fn=get_eval(),
export_strategies=[saved_model_export_utils.make_export_strategy(
serving_input_fn,
default_output_alternative_key=None,
exports_to_keep=1
)],
train_steps=1000
)
shutil.rmtree('training', ignore_errors=True) # start fresh each time.
learn_runner.run(experiment_fn, 'training')
Out[7]:
In [8]:
from google.datalab.ml import Summary
summary = Summary('./training')
summary.plot(['OptimizeLoss/loss', 'loss'])
In [9]:
prediction_data = create_time_series()
# First 30 values as x, Last 10 values as y.
prediction_x = list(prediction_data[:30])
prediction_y = list(prediction_data[30:])
print('x\n%s\n' % prediction_x)
print('y\n%s' % prediction_y)
sns.tsplot(prediction_x, color='blue')
y_truth_curve = [np.nan] * (len(prediction_x)-1) + [prediction_x[-1]] + prediction_y
sns.tsplot(y_truth_curve, color='green')
Out[9]:
First prediction we will do is just sending x, and for each value in x it will return a predicted value. And then we can compare the predicted values with the truth (x+1).
In [10]:
# Load model.
estimator = tflearn.Estimator(model_fn=model_fn, model_dir='training')
# Feed Prediction data.
predict_input_fn = lambda: {TIMESERIES_FEATURE_NAME: [prediction_x]}
predicted = list(estimator.predict(input_fn=predict_input_fn))
predicted = [p['predicted'] for p in predicted]
# Plot prediction source.
sns.tsplot(prediction_x, color='green')
# Plot predicted values.
sns.tsplot([prediction_x[0]] + predicted, color='red');
The next prediction is sending x, and predict next n values. We make n predictions and take only the last predicted value each time, append it to x for next prediction source.
In [11]:
estimator = tflearn.Estimator(model_fn=model_fn, model_dir='training')
# Prediction data starts with x.
x_total = list(prediction_x)
# Make n predictions.
for i in range(len(prediction_y)):
predict_input_fn = lambda: {TIMESERIES_FEATURE_NAME: [x_total]}
p = list(estimator.predict(input_fn=predict_input_fn))
# For each step, append the tail element of last predicted values.
x_total.append(p[-1]['predicted'])
# The first len(prediction_x) elements are prediction source. So remove them.
y_predicted = x_total[len(prediction_x):]
# Zero out prediction source (making them nan), add the last value of prediction source
# so the first edge in the curve is plotted, and add predicted values.
y_predicted_curve = [np.nan] * (len(prediction_x)-1) + [prediction_x[-1]] + y_predicted
# Plot prediction source.
sns.tsplot(prediction_x, color='blue')
# Plot truth curve.
sns.tsplot(y_truth_curve, color='green')
# Plot predicted curve.
sns.tsplot(y_predicted_curve, color='red');
In [ ]: